{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Feature Transformation with Scikit-Learn In This Notebook\n",
    "## Saving Features into the SageMaker Feature Store\n",
    "\n",
    "In this notebook, we convert raw text into BERT embeddings.  This will allow us to perform natural language processing tasks such as text classification. We save the features into the SageMaker Feature Store.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![](img/prepare_dataset_bert.png)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sagemaker\n",
    "import boto3\n",
    "\n",
    "sess = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "sm = boto3.Session().client(service_name=\"sagemaker\", region_name=region)\n",
    "s3 = boto3.Session().client(service_name=\"s3\", region_name=region)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![BERT Mania](img/bert_mania.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Convert Raw Text to BERT Features using Hugging Face and TensorFlow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "import collections\n",
    "import json\n",
    "import os\n",
    "import pandas as pd\n",
    "import csv\n",
    "from transformers import DistilBertTokenizer\n",
    "\n",
    "tokenizer = DistilBertTokenizer.from_pretrained(\"distilbert-base-uncased\")\n",
    "\n",
    "REVIEW_BODY_COLUMN = \"review_body\"\n",
    "REVIEW_ID_COLUMN = \"review_id\"\n",
    "# DATE_COLUMN = 'date'\n",
    "\n",
    "LABEL_COLUMN = \"star_rating\"\n",
    "LABEL_VALUES = [1, 2, 3, 4, 5]\n",
    "\n",
    "label_map = {}\n",
    "for (i, label) in enumerate(LABEL_VALUES):\n",
    "    label_map[label] = i\n",
    "\n",
    "\n",
    "class InputFeatures(object):\n",
    "    \"\"\"BERT feature vectors.\"\"\"\n",
    "\n",
    "    def __init__(self, input_ids, input_mask, segment_ids, label_id, review_id, date, label):\n",
    "        self.input_ids = input_ids\n",
    "        self.input_mask = input_mask\n",
    "        self.segment_ids = segment_ids\n",
    "        self.label_id = label_id\n",
    "        self.review_id = review_id\n",
    "        self.date = date\n",
    "        self.label = label\n",
    "        \n",
    "\n",
    "class Input(object):\n",
    "    \"\"\"A single training/test input for sequence classification.\"\"\"\n",
    "\n",
    "    def __init__(self, text, review_id, date, label=None):\n",
    "        \"\"\"Constructs an Input.\n",
    "        Args:\n",
    "          text: string. The untokenized text of the first sequence. For single\n",
    "            sequence tasks, only this sequence must be specified.\n",
    "          label: (Optional) string. The label of the example. This should be\n",
    "            specified for train and dev examples, but not for test examples.\n",
    "        \"\"\"\n",
    "        self.text = text\n",
    "        self.review_id = review_id\n",
    "        self.date = date\n",
    "        self.label = label\n",
    "\n",
    "\n",
    "def convert_input(the_input, max_seq_length):\n",
    "    # First, we need to preprocess our data so that it matches the data BERT was trained on:\n",
    "    # 1. Lowercase our text (if we're using a BERT lowercase model)\n",
    "    # 2. Tokenize it (i.e. \"sally says hi\" -> [\"sally\", \"says\", \"hi\"])\n",
    "    # 3. Break words into WordPieces (i.e. \"calling\" -> [\"call\", \"##ing\"])\n",
    "    #\n",
    "    # Fortunately, the Transformers tokenizer does this for us!\n",
    "\n",
    "    tokens = tokenizer.tokenize(the_input.text)\n",
    "    tokens.insert(0, '[CLS]')\n",
    "    tokens.append('[SEP]')\n",
    "    print(\"**{} tokens**\\n{}\\n\".format(len(tokens), tokens))\n",
    "\n",
    "    encode_plus_tokens = tokenizer.encode_plus(\n",
    "        the_input.text,\n",
    "        padding='max_length',\n",
    "        max_length=max_seq_length,\n",
    "        truncation=True\n",
    "    )\n",
    "\n",
    "    # The id from the pre-trained BERT vocabulary that represents the token.  (Padding of 0 will be used if the # of tokens is less than `max_seq_length`)\n",
    "    input_ids = encode_plus_tokens[\"input_ids\"]\n",
    "\n",
    "    # Specifies which tokens BERT should pay attention to (0 or 1).  Padded `input_ids` will have 0 in each of these vector elements.\n",
    "    input_mask = encode_plus_tokens[\"attention_mask\"]\n",
    "\n",
    "    # Segment ids are always 0 for single-sequence tasks such as text classification.  1 is used for two-sequence tasks such as question/answer and next sentence prediction.\n",
    "    segment_ids = [0] * max_seq_length\n",
    "\n",
    "    # Label for each training row (`star_rating` 1 through 5)\n",
    "    label_id = label_map[the_input.label]\n",
    "\n",
    "    features = InputFeatures(\n",
    "        input_ids=input_ids,\n",
    "        input_mask=input_mask,\n",
    "        segment_ids=segment_ids,\n",
    "        label_id=label_id,\n",
    "        review_id=the_input.review_id,\n",
    "        date=the_input.date,\n",
    "        label=the_input.label,\n",
    "    )\n",
    "\n",
    "    print(\"**input_ids**\\n{}\\n\".format(features.input_ids))\n",
    "    print(\"**input_mask**\\n{}\\n\".format(features.input_mask))\n",
    "    print(\"**segment_ids**\\n{}\\n\".format(features.segment_ids))\n",
    "    print(\"**label_id**\\n{}\\n\".format(features.label_id))\n",
    "    print(\"**review_id**\\n{}\\n\".format(features.review_id))\n",
    "    print(\"**date**\\n{}\\n\".format(features.date))\n",
    "    print(\"**label**\\n{}\\n\".format(features.label))\n",
    "\n",
    "    return features\n",
    "\n",
    "\n",
    "# We'll need to transform our data into a format that BERT understands.\n",
    "# - `text` is the text we want to classify, which in this case, is the `Request` field in our Dataframe.\n",
    "# - `label` is the star_rating label (1, 2, 3, 4, 5) for our training input data\n",
    "def transform_inputs_to_tfrecord(inputs, output_file, max_seq_length):\n",
    "    records = []\n",
    "    tf_record_writer = tf.io.TFRecordWriter(output_file)\n",
    "\n",
    "    for (input_idx, the_input) in enumerate(inputs):\n",
    "        if input_idx % 10000 == 0:\n",
    "            print(\"Writing input {} of {}\\n\".format(input_idx, len(inputs)))\n",
    "\n",
    "        features = convert_input(the_input, max_seq_length)\n",
    "\n",
    "        all_features = collections.OrderedDict()\n",
    "\n",
    "        # Create TFRecord With input_ids, input_mask, segment_ids, and label_ids\n",
    "        all_features[\"input_ids\"] = tf.train.Feature(int64_list=tf.train.Int64List(value=features.input_ids))\n",
    "        all_features[\"input_mask\"] = tf.train.Feature(int64_list=tf.train.Int64List(value=features.input_mask))\n",
    "        all_features[\"segment_ids\"] = tf.train.Feature(int64_list=tf.train.Int64List(value=features.segment_ids))\n",
    "        all_features[\"label_ids\"] = tf.train.Feature(int64_list=tf.train.Int64List(value=[features.label_id]))\n",
    "\n",
    "        tf_record = tf.train.Example(features=tf.train.Features(feature=all_features))\n",
    "        tf_record_writer.write(tf_record.SerializeToString())\n",
    "\n",
    "        # Create Record For Feature Store With All Features\n",
    "        records.append(\n",
    "            {  #'tf_record': tf_record.SerializeToString(),\n",
    "                \"input_ids\": features.input_ids,\n",
    "                \"input_mask\": features.input_mask,\n",
    "                \"segment_ids\": features.segment_ids,\n",
    "                \"label_id\": features.label_id,\n",
    "                \"review_id\": the_input.review_id,\n",
    "                \"date\": the_input.date,\n",
    "                \"label\": features.label,\n",
    "                #                        'review_body': features.review_body\n",
    "            }\n",
    "        )\n",
    "\n",
    "    tf_record_writer.close()\n",
    "\n",
    "    return records"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Three(3) feature vectors are created from each raw review (`review_body`) during the feature engineering phase to prepare for BERT processing:\n",
    "\n",
    "* **`input_ids`**:  The id from the pre-trained BERT vocabulary that represents the token.  (Padding of 0 will be used if the # of tokens is less than `max_seq_length`)\n",
    "    \n",
    "* **`input_mask`**:  Specifies which tokens BERT should pay attention to (0 or 1).  Padded `input_ids` will have 0 in each of these vector elements.\n",
    "\n",
    "* **`segment_ids`**:  Segment ids are always 0 for single-sequence tasks such as text classification.  1 is used for two-sequence tasks such as question/answer and next sentence prediction.\n",
    "\n",
    "And one(1) label is created from each raw review (`star_rating`)  :\n",
    "\n",
    "* **`label_id`**:  Label for each training row (`star_rating` 1 through 5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Demonstrate the BERT-specific Feature Engineering Step\n",
    "While we are demonstrating this code with a small amount of data here in the notebook, we will soon scale this to much more data on a powerful SageMaker cluster."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Feature Store requires an Event Time feature\n",
    "\n",
    "We need a record identifier name and an event time feature name. This will match the column of the corresponding features in our data. \n",
    "\n",
    "Note: Event time date feature type provided Integral. Event time type should be either Fractional(Unix timestamp in seconds) or String (ISO-8601 format) type."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from datetime import datetime\n",
    "from time import strftime\n",
    "\n",
    "# timestamp = datetime.now().replace(microsecond=0).isoformat()\n",
    "timestamp = datetime.now().strftime(\"%Y-%m-%dT%H:%M:%SZ\")\n",
    "print(timestamp)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "\n",
    "data = [\n",
    "    [\n",
    "        5,\n",
    "        \"ABCD12345\",\n",
    "        \"\"\"I needed an \"antivirus\" application and know the quality of Norton products.  This was a no brainer for me and I am glad it was so simple to get.\"\"\",\n",
    "    ],\n",
    "    [\n",
    "        3,\n",
    "        \"EFGH12345\",\n",
    "        \"\"\"The problem with ElephantDrive is that it requires the use of Java. Since Java is notorious for security problems I haveit removed from all of my computers. What files I do have stored are photos.\"\"\",\n",
    "    ],\n",
    "    [\n",
    "        1,\n",
    "        \"IJKL2345\",\n",
    "        \"\"\"Terrible, none of my codes worked, and I can't uninstall it.  I think this product IS malware and viruses\"\"\",\n",
    "    ],\n",
    "]\n",
    "\n",
    "df = pd.DataFrame(data, columns=[\"star_rating\", \"review_id\", \"review_body\"])\n",
    "\n",
    "# Use the InputExample class from BERT's run_classifier code to create examples from the data\n",
    "inputs = df.apply(\n",
    "    lambda x: Input(label=x[LABEL_COLUMN], text=x[REVIEW_BODY_COLUMN], review_id=x[REVIEW_ID_COLUMN], date=timestamp),\n",
    "    axis=1,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Make sure the date is in the correct ISO-8601 format for Feature Store\n",
    "print(inputs[0].date)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Save TFRecords\n",
    "\n",
    "The three(3) features vectors and one(1) label are converted into a list of `TFRecord` instances (1 per each row of training data):\n",
    "* **`tf_records`**:  Binary representation of each row of training data (3 features + 1 label)\n",
    "\n",
    "These `TFRecord`s are the engineered features that we will use throughout the rest of the pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "output_file = \"./data-tfrecord-featurestore/data.tfrecord\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Add Features to SageMaker Feature Store"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## SageMaker Feature Store Runtime\n",
    "\n",
    "A low-level client representing Amazon SageMaker Feature Store Runtime\n",
    "\n",
    "Contains all data plane API operations and data types for the Amazon SageMaker Feature Store. Use this API to put, delete, and retrieve (get) features from a feature store."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "featurestore_runtime = boto3.Session().client(service_name=\"sagemaker-featurestore-runtime\", region_name=region)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create FeatureGroup\n",
    "\n",
    "A feature group is a logical grouping of features, defined in the Feature Store, to describe records. A feature group definition is composed of a list of feature definitions, a record identifier name, and configurations for its online and offline store.\n",
    "\n",
    "Create feature group, describe feature group, update feature groups, delete feature group and list feature groups APIs can be used to manage feature groups.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from time import gmtime, strftime, sleep\n",
    "\n",
    "feature_group_name = \"reviews-feature-group-\" + strftime(\"%d-%H-%M-%S\", gmtime())\n",
    "print(feature_group_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.feature_store.feature_definition import (\n",
    "    FeatureDefinition,\n",
    "    FeatureTypeEnum,\n",
    ")\n",
    "\n",
    "feature_definitions = [\n",
    "    FeatureDefinition(feature_name=\"input_ids\", feature_type=FeatureTypeEnum.STRING),\n",
    "    FeatureDefinition(feature_name=\"input_mask\", feature_type=FeatureTypeEnum.STRING),\n",
    "    FeatureDefinition(feature_name=\"segment_ids\", feature_type=FeatureTypeEnum.STRING),\n",
    "    FeatureDefinition(feature_name=\"label_id\", feature_type=FeatureTypeEnum.INTEGRAL),\n",
    "    FeatureDefinition(feature_name=\"review_id\", feature_type=FeatureTypeEnum.STRING),\n",
    "    FeatureDefinition(feature_name=\"date\", feature_type=FeatureTypeEnum.STRING),\n",
    "    FeatureDefinition(feature_name=\"label\", feature_type=FeatureTypeEnum.INTEGRAL),\n",
    "    #    FeatureDefinition(feature_name='review_body', feature_type=FeatureTypeEnum.STRING),\n",
    "    FeatureDefinition(feature_name=\"split_type\", feature_type=FeatureTypeEnum.STRING),\n",
    "]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.feature_store.feature_group import FeatureGroup\n",
    "\n",
    "feature_group = FeatureGroup(name=feature_group_name, feature_definitions=feature_definitions, sagemaker_session=sess)\n",
    "print(feature_group)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Specify `record identifier` and `event time` features"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "record_identifier_feature_name = \"review_id\"\n",
    "event_time_feature_name = \"date\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Set S3 Prefix for Offline Feature Store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "prefix = \"reviews-feature-store-\" + timestamp\n",
    "print(prefix)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Feature Group\n",
    "\n",
    "The last step for creating the feature group is to use the `create` function. The online store is not created by default, so we must set this as `True` if we want to enable it. The `s3_uri` is the location of our offline store."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_group.create(\n",
    "    s3_uri=f\"s3://{bucket}/{prefix}\",\n",
    "    record_identifier_name=record_identifier_feature_name,\n",
    "    event_time_feature_name=event_time_feature_name,\n",
    "    role_arn=role,\n",
    "    enable_online_store=False,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Describe the Feature Group"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_group.describe()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## List All Feature Groups\n",
    "\n",
    "We use the boto3 SageMaker client to list all FeatureGroups."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# sm.list_feature_groups()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Wait For The Feature Group Creation Complete\n",
    "\n",
    "Creating a feature group takes time as the data is loaded. We will need to wait until it is created before you can use it. You can check status using the following method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "\n",
    "\n",
    "def wait_for_feature_group_creation_complete(feature_group):\n",
    "    status = feature_group.describe().get(\"FeatureGroupStatus\")\n",
    "    while status == \"Creating\":\n",
    "        print(\"Waiting for Feature Group Creation\")\n",
    "        time.sleep(5)\n",
    "        status = feature_group.describe().get(\"FeatureGroupStatus\")\n",
    "    if status != \"Created\":\n",
    "        raise RuntimeError(f\"Failed to create feature group {feature_group.name}\")\n",
    "    print(f\"FeatureGroup {feature_group.name} successfully created.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "wait_for_feature_group_creation_complete(feature_group=feature_group)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Review The Records To Ingest Into Feature Store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "max_seq_length = 64\n",
    "records = transform_inputs_to_tfrecord(inputs, output_file, max_seq_length)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Ingest Records into Feature Store\n",
    "\n",
    "After the FeatureGroups have been created, we can put data into the FeatureGroups by using the `PutRecord` API. \n",
    "\n",
    "This API can handle high TPS and is designed to be called by different streams. The data from all of these Put requests is buffered and written to S3 in chunks. \n",
    "\n",
    "The files will be written to the offline store within a few minutes of ingestion. To accelerate the ingestion process, we can specify multiple workers to do the job simultaneously. \n",
    "\n",
    "Use `put_record(...)` to put a single record in the FeatureGroup.\n",
    "\n",
    "Use `ingest(...)` to ingest the content of a pandas DataFrame to Feature Store. You can set the `max_worker` to the number of threads to be created to work on different partitions of the `data_frame` in parallel."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "\n",
    "df_records = pd.DataFrame.from_dict(records)\n",
    "df_records[\"split_type\"] = \"train\"\n",
    "df_records"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Cast DataFrame `Object` to Supported Feature Store Data Type `String`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def cast_object_to_string(data_frame):\n",
    "    for label in data_frame.columns:\n",
    "        if data_frame.dtypes[label] == \"object\":\n",
    "            data_frame[label] = data_frame[label].astype(\"str\").astype(\"string\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cast_object_to_string(df_records)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_group.ingest(data_frame=df_records, max_workers=3, wait=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Wait For Data In Offline Feature Store To Become Available\n",
    "\n",
    "Creating a feature group takes time as the data is loaded. We will need to wait until it is created before we can use it. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "offline_store_contents = None\n",
    "\n",
    "while offline_store_contents is None:\n",
    "    objects_in_bucket = s3.list_objects(Bucket=bucket, Prefix=prefix)\n",
    "    if \"Contents\" in objects_in_bucket and len(objects_in_bucket[\"Contents\"]) > 1:\n",
    "        offline_store_contents = objects_in_bucket[\"Contents\"]\n",
    "    else:\n",
    "        print(\"Waiting for data in offline store...\\n\")\n",
    "        sleep(60)\n",
    "\n",
    "print(\"Data available.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## _Wait For The Cell Above To Complete and show `Data available`._"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Get Record From Online Feature Store\n",
    "\n",
    "Use for OnlineStore serving from a FeatureStore. Only the latest records stored in the OnlineStore can be retrieved. If no Record with `RecordIdentifierValue` is found, then an empty result is returned."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "# record_identifier_value = \"IJKL2345\"\n",
    "\n",
    "# featurestore_runtime.get_record(\n",
    "#     FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=record_identifier_value\n",
    "# )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Build Training Dataset\n",
    "\n",
    "SageMaker FeatureStore automatically builds the Glue Data Catalog for FeatureGroups (we can optionally turn it on/off while creating the FeatureGroup). We can create a training dataset by querying the data in the feature store. This is done by utilizing the auto-built Catalog and run an Athena query. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create An Athena Query"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_store_query = feature_group.athena_query()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Get The Feature Group Table Name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_store_table = feature_store_query.table_name"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Build an Athena SQL Query\n",
    "\n",
    "Show Hive DDL commands to define or change structure of tables or databases in Hive. The schema of the table is generated based on the feature definitions. Columns are named after feature name and data-type are inferred based on feature type. \n",
    "\n",
    "Integral feature type is mapped to INT data-type. Fractional feature type is mapped to FLOAT data-type. String feature type is mapped to STRING data-type."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "print(feature_group.as_hive_ddl())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query_string = \"\"\"\n",
    "SELECT input_ids, input_mask, segment_ids, label_id, split_type  FROM \"{}\" WHERE split_type='train' LIMIT 5\n",
    "\"\"\".format(\n",
    "    feature_store_table\n",
    ")\n",
    "\n",
    "print(\"Running \" + query_string)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run Athena Query\n",
    "The query results are stored in a S3 bucket."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_store_query.run(query_string=query_string, output_location=\"s3://\" + bucket + \"/\" + prefix + \"/query_results/\")\n",
    "\n",
    "feature_store_query.wait()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## View Query Results\n",
    "\n",
    "Load query results in a Pandas DataFrame."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataset = pd.DataFrame()\n",
    "\n",
    "dataset = feature_store_query.as_dataframe()\n",
    "\n",
    "dataset"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review the Feature Store\n",
    "\n",
    "![Feature Store](img/feature_store_sm_extension.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Release Resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "\n",
    "<p><b>Shutting down your kernel for this notebook to release resources.</b></p>\n",
    "<button class=\"sm-command-button\" data-commandlinker-command=\"kernelmenu:shutdown\" style=\"display:none;\">Shutdown Kernel</button>\n",
    "        \n",
    "<script>\n",
    "try {\n",
    "    els = document.getElementsByClassName(\"sm-command-button\");\n",
    "    els[0].click();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}    \n",
    "</script>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%javascript\n",
    "\n",
    "try {\n",
    "    Jupyter.notebook.save_checkpoint();\n",
    "    Jupyter.notebook.session.delete();\n",
    "}\n",
    "catch(err) {\n",
    "    // NoOp\n",
    "}"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
